Map Join的操作 map类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 package com.kun.map_join; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.nio.file.Path; import java.util.HashMap; public class LeftOutJoinMapper extends Mapper { private HashMap<String,String> city_info=new HashMap(); private Text outPutKey = new Text(); private Text outPutValue = new Text(); private String mapInputStr = null; private String mapInputSpit[]=null; private String city_secondPart = null; /** * 此方法在每个task开始之前执行,这里主要用作从DistributedCache * 中取到A文件,并将里边记录取出放到内存中。 */ @Override protected void setup(Context context) throws IOException { BufferedReader br =null; String cityInfo = null; URI[] cacheFiles = context.getCacheFiles(); String path = cacheFiles[0].getPath(); br=new BufferedReader(new FileReader(path)); System.out.println("===="); while (null!=(cityInfo=br.readLine())){ String[] cityPart =cityInfo.split("\\|"); if(cityPart.length==5){ city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]); } } } /** * Map端的实现相当简单,直接判断B中的 * cityID是否存在我的map中就ok了,这样就可以实现Map Join了 */ @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { FileSplit fileSplit=(FileSplit)context.getInputSplit(); String pathName=fileSplit.getPath().getName(); if(pathName.endsWith("B.txt")) { if (value == null || value.toString().equals("")) { return; } mapInputStr = value.toString(); mapInputSpit = mapInputStr.split("\\|"); //过滤非法记录 if (mapInputSpit.length != 4) { return; } //判断链接字段是否在map中存在 city_secondPart = city_info.get(mapInputSpit[3]); if (city_secondPart != null) { this.outPutKey.set(mapInputSpit[3]); this.outPutValue.set(city_secondPart + "\t" + mapInputSpit[0] + "\t" + mapInputSpit[1] + "\t" + mapInputSpit[2]); context.write(outPutKey, outPutValue); } } } }
driver类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 package com.kun.map_join; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class LeftOutJoinDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { System.setProperty("hadoop.home.dir", "D:\\software\\master\\winutils-master\\hadoop-2.6.0"); Configuration configuration = new Configuration(); // 判断文件系统是否存在,如果存在就删除 FileSystem fileSystem = FileSystem.get(configuration); Path outputPath = new Path("output"); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } Job job = Job.getInstance(configuration); job.setNumReduceTasks(0); job.addCacheFile(new URI("file:///D:/other/testjoin/A.txt")); FileInputFormat.setInputPaths(job, new Path("D:\\other\\testjoin")); //设置map输入文件路径 FileOutputFormat.setOutputPath(job,new Path("output"));//设置reduce输出文件路径 job.setJarByClass(LeftOutJoinDriver.class); job.setMapperClass(LeftOutJoinMapper.class); job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式 job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式 //设置map的输出key和value类型 job.setMapOutputKeyClass(Text.class); //设置reduce的输出key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); } }
Reduce Join的操作 这里主要分析一下reduce join的一些不足。之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。 依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 <hadoop.version>2.6.0-cdh5.7.0</hadoop.version> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> </repository> <!--添加Hadoop的依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency>
自定义CombineValues类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package com.kun.mapjion; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class CombineValues implements WritableComparable<CombineValues> { private Text joinKey;//链接关键字 private Text flag;//文件来源标志 private Text secondPart; //除了链接键外的其他部分 public Text getJoinKey() { return joinKey; } public void setJoinKey(Text joinKey) { this.joinKey = joinKey; } public Text getFlag() { return flag; } public void setFlag(Text flag) { this.flag = flag; } public Text getSecondPart() { return secondPart; } public void setSecondPart(Text secondPart) { this.secondPart = secondPart; } public CombineValues() { this.joinKey = new Text(); this.flag = new Text(); this.secondPart = new Text(); } @Override public int compareTo(CombineValues o) { return this.joinKey.compareTo(o.getJoinKey()); } @Override public void write(DataOutput dataOutput) throws IOException { this.joinKey.write(dataOutput); this.flag.write(dataOutput); this.secondPart.write(dataOutput); } @Override public void readFields(DataInput dataInput) throws IOException { this.joinKey.readFields(dataInput); this.flag.readFields(dataInput); this.secondPart.readFields(dataInput); } @Override public String toString() { return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]"; } }
Mapper类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 package com.kun.mapjion; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class LeftOutJoinMapper extends Mapper<Object,Text, Text,CombineValues> { private CombineValues combineValues =new CombineValues(); private Text flag = new Text(); private Text joinkey = new Text(); private Text secondPart = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //获得文件输入路径 FileSplit fileSplit=(FileSplit)context.getInputSplit(); String pathName=fileSplit.getPath().getName(); //数据来自于A文件,标记为0 if(pathName.endsWith("A.txt")) { String[] splits = value.toString().split("\\|"); //过滤格式错误的记录 if (splits.length != 5) { return; } flag.set("0"); joinkey.set(splits[0]); secondPart.set(splits[1] + "\t" + splits[2] + "\t" + splits[3] + "\t" + splits[4]); combineValues.setFlag(flag); combineValues.setJoinKey(joinkey); combineValues.setSecondPart(secondPart); context.write(combineValues.getJoinKey(), combineValues); }else if(pathName.endsWith("B.txt")){//数据来自于B,标记为1 String[] splits = value.toString().split("\\|"); //过滤格式错误的记录 if (splits.length != 4) { return; } flag.set("1"); joinkey.set(splits[3]); secondPart.set(splits[0] + "\t" + splits[1] + "\t" + splits[2]); combineValues.setFlag(flag); combineValues.setJoinKey(joinkey); combineValues.setSecondPart(secondPart); context.write(combineValues.getJoinKey(), combineValues); } } }
Reduce类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package com.kun.mapjion; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; public class LeftOutJoinReducer extends Reducer<Text,CombineValues,Text,Text> { private static final Logger logger = LoggerFactory.getLogger(LeftOutJoinReducer.class); //存储一个分组中的左表信息 private ArrayList<Text> leftTable = new ArrayList<Text>(); //存储一个分组中的右表信息 private ArrayList<Text> rightTable = new ArrayList<Text>(); private Text secondPar = null; private Text output = new Text(); /** * 一个分组调用一次reduce函数 * @param key * @param values * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<CombineValues> values, Context context) throws IOException, InterruptedException { leftTable.clear(); rightTable.clear(); /** * 将分组中的元素按照文件分别进行存放 * 这种方法要注意的问题: * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM, * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最 * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。 */ for(CombineValues cv:values){ secondPar = new Text(cv.getSecondPart().toString()); //左表 if("0".equals(cv.getFlag().toString().trim())){ leftTable.add(secondPar); } //右表 else if("1".equals(cv.getFlag().toString().trim())){ rightTable.add(secondPar); } } System.out.println("A:"+leftTable.toString()); System.out.println("B:"+rightTable.toString()); for(Text leftPart : leftTable){ for(Text rightPart : rightTable){ output.set(leftPart+ "\t" + rightPart); context.write(key, output); } } } }
Driver类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package com.kun.mapjion; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; public class LeftOutJoinDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { System.setProperty("hadoop.home.dir", "D:\\hadoop2.6_Win_x64-master"); Configuration configuration = new Configuration(); // 判断文件系统是否存在,如果存在就删除 FileSystem fileSystem = FileSystem.get(configuration); Path outputPath = new Path("output"); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } Job job = Job.getInstance(configuration); job.setJarByClass(LeftOutJoinDriver.class); FileInputFormat.setInputPaths(job, new Path("D:/Documents/test2/")); //设置map输入文件路径 FileOutputFormat.setOutputPath(job,new Path("output"));//设置reduce输出文件路径 job.setMapperClass(LeftOutJoinMapper.class); job.setReducerClass(LeftOutJoinReducer.class); job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式 job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式 //设置map的输出key和value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineValues.class); //设置reduce的输出key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); } }
结果
1 2 3 4 5 1 长春 1 901 1 3 3G 555 1 长春 1 901 1 1 2G 123 2 吉林 2 902 1 2 3G 333 3 四平 3 903 1 4 2G 777 4 松原 4 904 1 5 3G 666
文件A.txt id name orderid city_code is_show 0|其他|9999|9999|0 1|长春|1|901|1 2|吉林|2|902|1 3|四平|3|903|1 4|松原|4|904|1 5|通化|5|905|1 6|辽源|6|906|1 7|白城|7|907|1 8|白山|8|908|1 9|延吉|9|909|1
文件B.txt userID network flow cityID 1|2G|123|1 2|3G|333|2 3|3G|555|1 4|2G|777|3 5|3G|666|4